langgraphgo API 参考文档 #

目录 #

  1. 简介
  2. 核心图类型
  3. 状态图 API
  4. 消息图 API
  5. 预构建组件
  6. 检查点系统
  7. 流式处理
  8. 工具集成
  9. 状态模式与架构
  10. 配置与上下文

简介 #

langgraphgo 是一个强大的 Go 语言库,用于构建基于图的 AI 应用程序。它提供了灵活的状态管理、并发执行、检查点持久化和流式处理功能。本文档详细介绍了所有公共 API 接口。

核心图类型 #

图的基本概念 #

langgraphgo 提供两种主要的图类型:状态图(StateGraph)和消息图(MessageGraph)。这两种图都支持并行执行、条件边和中断机制。

classDiagram
class StateGraph {
+map~string,Node~ nodes
+[]Edge edges
+map~string,func~ conditionalEdges
+string entryPoint
+RetryPolicy retryPolicy
+StateMerger stateMerger
+StateSchema Schema
+AddNode(name, fn) void
+AddEdge(from, to) void
+AddConditionalEdge(from, condition) void
+SetEntryPoint(name) void
+SetRetryPolicy(policy) void
+SetStateMerger(merger) void
+SetSchema(schema) void
+Compile() StateRunnable
}
class MessageGraph {
+map~string,Node~ nodes
+[]Edge edges
+map~string,func~ conditionalEdges
+string entryPoint
+StateMerger stateMerger
+StateSchema Schema
+AddNode(name, fn) void
+AddEdge(from, to) void
+AddConditionalEdge(from, condition) void
+SetEntryPoint(name) void
+SetStateMerger(merger) void
+SetSchema(schema) void
+Compile() Runnable
}
class Node {
+string Name
+func Function
}
class Edge {
+string From
+string To
}
StateGraph --> Node : "包含"
StateGraph --> Edge : "包含"
MessageGraph --> Node : "包含"
MessageGraph --> Edge : "包含"

图表来源

状态图 API #

StateGraph 构造函数 #

NewStateGraph() #

创建一个新的状态图实例。

签名: func NewStateGraph() *StateGraph

返回值: 新的状态图实例

使用示例:

workflow := graph.NewStateGraph()

NewMessagesStateGraph() #

创建一个带有默认消息模式的状态图,推荐用于基于聊天的代理。

签名: func NewMessagesStateGraph() *StateGraph

返回值: 配置好的状态图实例

使用示例:

workflow := graph.NewMessagesStateGraph()

核心方法 #

AddNode(name string, fn func) void #

向状态图添加新节点。

参数:

签名: func (g *StateGraph) AddNode(name string, fn func(ctx context.Context, state interface{}) (interface{}, error))

使用示例:

workflow.AddNode("agent", func(ctx context.Context, state interface{}) (interface{}, error) {
    // 处理逻辑
    return newState, nil
})

AddEdge(from string, to string) void #

添加静态边连接两个节点。

参数:

签名: func (g *StateGraph) AddEdge(from, to string)

使用示例:

workflow.AddEdge("agent", "tools")

AddConditionalEdge(from string, condition func) void #

添加条件边,在运行时根据条件决定目标节点。

参数:

签名: func (g *StateGraph) AddConditionalEdge(from string, condition func(ctx context.Context, state interface{}) string)

使用示例:

workflow.AddConditionalEdge("agent", func(ctx context.Context, state interface{}) string {
    // 基于状态判断下一个节点
    return "tools"
})

SetEntryPoint(name string) void #

设置入口节点。

参数:

签名: func (g *StateGraph) SetEntryPoint(name string)

使用示例:

workflow.SetEntryPoint("agent")

SetRetryPolicy(policy *RetryPolicy) void #

设置重试策略。

参数:

签名: func (g *StateGraph) SetRetryPolicy(policy *RetryPolicy)

使用示例:

retryPolicy := &graph.RetryPolicy{
    MaxRetries: 3,
    BackoffStrategy: graph.ExponentialBackoff,
    RetryableErrors: []string{"timeout", "network"},
}
workflow.SetRetryPolicy(retryPolicy)

SetStateMerger(merger StateMerger) void #

设置状态合并器。

参数:

签名: func (g *StateGraph) SetStateMerger(merger StateMerger)

SetSchema(schema StateSchema) void #

设置状态模式。

参数:

签名: func (g *StateGraph) SetSchema(schema StateSchema)

编译与执行 #

Compile() (*StateRunnable, error) #

编译状态图并返回可执行实例。

返回值:

签名: func (g *StateGraph) Compile() (*StateRunnable, error)

使用示例:

runnable, err := workflow.Compile()
if err != nil {
    log.Fatal(err)
}

Invoke(ctx context.Context, initialState interface{}) (interface{}, error) #

执行编译后的状态图。

参数:

返回值:

签名: func (r *StateRunnable) Invoke(ctx context.Context, initialState interface{}) (interface{}, error)

InvokeWithConfig(ctx context.Context, initialState interface{}, config *Config) (interface{}, error) #

带配置执行状态图。

参数:

签名: func (r *StateRunnable) InvokeWithConfig(ctx context.Context, initialState interface{}, config *Config) (interface{}, error)

图表来源

消息图 API #

MessageGraph 构造函数 #

NewMessageGraph() #

创建新的消息图实例。

签名: func NewMessageGraph() *MessageGraph

返回值: 新的消息图实例

使用示例:

workflow := graph.NewMessageGraph()

核心方法 #

消息图的方法与状态图基本相同,但主要用于消息传递模式:

// 添加节点
workflow.AddNode("process", processFunction)

// 设置入口
workflow.SetEntryPoint("process")

// 添加边
workflow.AddEdge("process", "END")

// 编译
runnable, err := workflow.Compile()

编译与执行 #

Compile() (*Runnable, error) #

编译消息图。

返回值:

签名: func (g *MessageGraph) Compile() (*Runnable, error)

Invoke(ctx context.Context, initialState interface{}) (interface{}, error) #

执行消息图。

签名: func (r *Runnable) Invoke(ctx context.Context, initialState interface{}) (interface{}, error)

InvokeWithConfig(ctx context.Context, initialState interface{}, config *Config) (interface{}, error) #

带配置执行消息图。

签名: func (r *Runnable) InvokeWithConfig(ctx context.Context, initialState interface{}, config *Config) (interface{}, error)

图表来源

预构建组件 #

CreateAgent() #

创建智能代理,支持工具调用和对话管理。

签名: func CreateAgent(model llms.Model, inputTools []tools.Tool, opts ...CreateAgentOption) (*graph.StateRunnable, error)

参数:

返回值:

配置选项:

使用示例:

agent, err := prebuilt.CreateAgent(
    model,
    []tools.Tool{calculator, webSearch},
    prebuilt.WithSystemMessage("你是一个有用的助手"),
    prebuilt.WithStateModifier(customModifier),
)

CreateReactAgent() #

创建 ReAct 格式的代理,遵循推理-行动-观察循环。

签名: func CreateReactAgent(model llms.Model, inputTools []tools.Tool) (*graph.StateRunnable, error)

参数:

返回值:

使用示例:

reactAgent, err := prebuilt.CreateReactAgent(model, tools)

CreateSupervisor() #

创建监督者代理,协调多个子代理。

签名: func CreateSupervisor(model llms.Model, members map[string]*graph.StateRunnable) (*graph.StateRunnable, error)

参数:

返回值:

使用示例:

supervisor, err := prebuilt.CreateSupervisor(model, map[string]*graph.StateRunnable{
    "researcher": researcherAgent,
    "writer": writerAgent,
})

RAG 组件 #

RAGPipeline #

检索增强生成(RAG)管道组件。

签名: func NewRAGPipeline(config *RAGConfig) *RAGPipeline

配置选项:

方法:

使用示例:

config := &prebuilt.RAGConfig{
    TopK: 4,
    SystemPrompt: "回答问题时要准确且有帮助",
}
pipeline := prebuilt.NewRAGPipeline(config)
err := pipeline.BuildBasicRAG()
if err != nil {
    log.Fatal(err)
}
runnable, err := pipeline.Compile()

图表来源

检查点系统 #

CheckpointStore 接口 #

定义检查点存储的标准接口。

接口方法:

内存检查点存储 #

MemoryCheckpointStore #

基于内存的检查点存储实现。

签名: func NewMemoryCheckpointStore() *MemoryCheckpointStore

方法:

文件检查点存储 #

FileCheckpointStore #

基于文件的检查点存储实现。

签名: func NewFileCheckpointStore(writer io.Writer, reader io.Reader) *FileCheckpointStore

检查点配置 #

CheckpointConfig #

检查点配置结构。

字段:

DefaultCheckpointConfig() #

返回默认检查点配置。

签名: func DefaultCheckpointConfig() CheckpointConfig

检查点可执行对象 #

CheckpointableRunnable #

扩展 Runnable 以支持检查点功能。

方法:

图表来源

流式处理 #

StreamMode 枚举 #

定义流式处理模式。

常量:

StreamConfig #

流式处理配置。

字段:

DefaultStreamConfig() #

返回默认流式配置。

签名: func DefaultStreamConfig() StreamConfig

StreamResult #

流式执行结果。

字段:

StreamingListener #

实现 NodeListener 以支持事件流式传输。

方法:

StreamingRunnable #

扩展 Runnable 以支持流式处理。

方法:

StreamingExecutor #

高级流式执行器。

方法:

图表来源

工具集成 #

ExaSearch #

Exa API 搜索工具。

构造函数: 签名: func NewExaSearch(apiKey string, opts ...ExaOption) (*ExaSearch, error)

选项:

方法:

使用示例:

exa, err := tool.NewExaSearch("", tool.WithExaNumResults(10))
if err != nil {
    log.Fatal(err)
}

result, err := exa.Call(ctx, "Go programming language")
if err != nil {
    log.Fatal(err)
}

TavilySearch #

Tavily API 搜索工具。

构造函数: 签名: func NewTavilySearch(apiKey string, opts ...TavilyOption) (*TavilySearch, error)

选项:

方法:

使用示例:

tavily, err := tool.NewTavilySearch("", tool.WithTavilySearchDepth("advanced"))
if err != nil {
    log.Fatal(err)
}

result, err := tavily.Call(ctx, "最新的人工智能发展")
if err != nil {
    log.Fatal(err)
}

ToolExecutor #

工具执行器。

构造函数: 签名: func NewToolExecutor(inputTools []tools.Tool) *ToolExecutor

方法:

使用示例:

executor := prebuilt.NewToolExecutor([]tools.Tool{exa, tavily})

// 执行单个工具
result, err := executor.Execute(ctx, prebuilt.ToolInvocation{
    Tool:      "Exa_Search",
    ToolInput: "机器学习教程",
})

// 执行多个工具
invocations := []prebuilt.ToolInvocation{
    {Tool: "Exa_Search", ToolInput: "AI"},
    {Tool: "Tavily_Search", ToolInput: "最新技术"},
}
results, err := executor.ExecuteMany(ctx, invocations)

图表来源

状态模式与架构 #

StateSchema 接口 #

定义状态结构和更新逻辑。

接口方法:

MapSchema #

基于映射的状态模式实现。

构造函数: 签名: func NewMapSchema() *MapSchema

方法:

Reducers #

OverwriteReducer #

覆盖旧值。

签名: func OverwriteReducer(current, new interface{}) (interface{}, error)

AppendReducer #

追加到切片。

签名: func AppendReducer(current, new interface{}) (interface{}, error)

重试策略 #

RetryPolicy #

字段:

BackoffStrategy 枚举 #

图表来源

配置与上下文 #

Config 结构 #

执行配置。

字段:

中断机制 #

Interrupt(ctx context.Context, value interface{}) (interface{}, error) #

暂停执行并等待输入。

参数:

返回值:

WithResumeValue(ctx context.Context, value interface{}) context.Context #

在上下文中添加恢复值。

签名: func WithResumeValue(ctx context.Context, value interface{}) context.Context

GetResumeValue(ctx context.Context) interface{} #

从上下文中获取恢复值。

签名: func GetResumeValue(ctx context.Context) interface{}

错误处理 #

GraphInterrupt #

执行中断错误。

字段:

图表来源

总结 #

langgraphgo 提供了一个强大而灵活的框架来构建复杂的 AI 应用程序。其核心特性包括:

  1. 双图架构: 支持状态图和消息图两种模式
  2. 并发执行: 自动并行处理多个节点
  3. 灵活的状态管理: 通过状态模式和 reducer 实现复杂的状态操作
  4. 持久化支持: 检查点系统支持长时间运行的应用
  5. 实时反馈: 流式处理提供实时事件流
  6. 工具集成: 内置多种外部工具集成
  7. 可扩展性: 丰富的配置选项和自定义能力

该库特别适合需要复杂工作流程、状态管理和持久化的 AI 应用场景,如智能代理、RAG 系统和多步骤任务处理。